package com.atguigu.day05;

import com.atguigu.beans.WaterSensor;
import com.atguigu.func.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * @author Felix
 * @date 2024/4/3
 * 该案例演示开滚动处理时间窗口，并使用reduce对窗口中的元素进行聚合
 * 需求：取出最近10s中每个传感器采集到的水位的和
 */
public class Flink04_window_reduce {
    public static void main(String[] args) throws Exception {
        //TODO 1.指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //TODO 2.设置并行度
        env.setParallelism(1);
        //TODO 3.从指定的网络端口读取数据
        DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 8888);
        //TODO 4.对读取的数据进行类型转换   String->WaterSensor
        SingleOutputStreamOperator<WaterSensor> wsDS = socketDS.map(new WaterSensorMapFunction());
        //TODO 5.按照传感器id进行分组
        KeyedStream<WaterSensor, String> keyedDS = wsDS.keyBy(WaterSensor::getId);
        //TODO 6.开窗 滚动处理时间窗口  大小10s
        WindowedStream<WaterSensor, String, TimeWindow> windowDS
                = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        //TODO 7.对窗口中的数据进行聚合计算---reduce
        SingleOutputStreamOperator<WaterSensor> reduceDS = windowDS.reduce(
                new ReduceFunction<WaterSensor>() {
                    @Override
                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        System.out.println("中间累加的结果:" + value1);
                        System.out.println("新来的数据:" + value2);
                        return new WaterSensor(value1.id, System.currentTimeMillis(), value1.vc + value2.vc);
                    }
                }
        );
        //TODO 8.将聚合结果进行打印输出
        reduceDS.print();
        //TODO 9.提交作业
        env.execute();
    }
}
